#!/usr/bin/env python3
# group: img
#
# Test case for NBD's blockdev-add interface
#
# Copyright (C) 2016 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import os
import random
import socket
import stat
import time
import iotests
from iotests import cachemode, aiomode, imgfmt, qemu_img, qemu_nbd, qemu_nbd_early_pipe

NBD_PORT_START      = 32768
NBD_PORT_END        = NBD_PORT_START + 1024
NBD_IPV6_PORT_START = NBD_PORT_END
NBD_IPV6_PORT_END   = NBD_IPV6_PORT_START + 1024

test_img = os.path.join(iotests.test_dir, 'test.img')
unix_socket = os.path.join(iotests.sock_dir, 'nbd.socket')


def flatten_sock_addr(crumpled_address):
    result = { 'type': crumpled_address['type'] }
    result.update(crumpled_address['data'])
    return result


class NBDBlockdevAddBase(iotests.QMPTestCase):
    def blockdev_add_options(self, address, export, node_name):
        options = { 'node-name': node_name,
                    'driver': 'raw',
                    'file': {
                        'driver': 'nbd',
                        'read-only': True,
                        'server': address
                    } }
        if export is not None:
            options['file']['export'] = export
        return options

    def client_test(self, filename, address, export=None,
                    node_name='nbd-blockdev', delete=True):
        bao = self.blockdev_add_options(address, export, node_name)
        result = self.vm.qmp('blockdev-add', **bao)
        self.assert_qmp(result, 'return', {})

        found = False
        result = self.vm.qmp('query-named-block-nodes')
        for node in result['return']:
            if node['node-name'] == node_name:
                found = True
                if isinstance(filename, str):
                    self.assert_qmp(node, 'image/filename', filename)
                else:
                    self.assert_json_filename_equal(node['image']['filename'],
                                                    filename)
                break
        self.assertTrue(found)

        if delete:
            result = self.vm.qmp('blockdev-del', node_name=node_name)
            self.assert_qmp(result, 'return', {})


class QemuNBD(NBDBlockdevAddBase):
    def setUp(self):
        qemu_img('create', '-f', iotests.imgfmt, test_img, '64k')
        self.vm = iotests.VM()
        self.vm.launch()

    def tearDown(self):
        self.vm.shutdown()
        os.remove(test_img)
        try:
            os.remove(unix_socket)
        except OSError:
            pass

    def _try_server_up(self, *args):
        status, msg = qemu_nbd_early_pipe('-f', imgfmt, test_img, *args)
        if status == 0:
            return True
        if 'Address already in use' in msg:
            return False
        self.fail(msg)

    def _server_up(self, *args):
        self.assertTrue(self._try_server_up(*args))

    def test_inet(self):
        while True:
            nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END)
            if self._try_server_up('-b', 'localhost', '-p', str(nbd_port)):
                break

        address = { 'type': 'inet',
                    'data': {
                        'host': 'localhost',
                        'port': str(nbd_port)
                    } }
        self.client_test('nbd://localhost:%i' % nbd_port,
                         flatten_sock_addr(address))

    def test_unix(self):
        self._server_up('-k', unix_socket)
        address = { 'type': 'unix',
                    'data': { 'path': unix_socket } }
        self.client_test('nbd+unix://?socket=' + unix_socket,
                         flatten_sock_addr(address))


class BuiltinNBD(NBDBlockdevAddBase):
    def setUp(self):
        qemu_img('create', '-f', iotests.imgfmt, test_img, '64k')
        self.vm = iotests.VM()
        self.vm.launch()
        self.server = iotests.VM('.server')
        self.server.add_drive_raw('if=none,id=nbd-export,' +
                                  'file=%s,' % test_img +
                                  'format=%s,' % imgfmt +
                                  'cache=%s,' % cachemode +
                                  'aio=%s' % aiomode)
        self.server.launch()

    def tearDown(self):
        self.vm.shutdown()
        self.server.shutdown()
        os.remove(test_img)
        try:
            os.remove(unix_socket)
        except OSError:
            pass

    # Returns False on EADDRINUSE; fails an assertion on other errors.
    # Returns True on success.
    def _try_server_up(self, address, export_name=None, export_name2=None):
        result = self.server.qmp('nbd-server-start', addr=address)
        if 'error' in result and \
           'Address already in use' in result['error']['desc']:
            return False
        self.assert_qmp(result, 'return', {})

        if export_name is None:
            result = self.server.qmp('nbd-server-add', device='nbd-export')
        else:
            result = self.server.qmp('nbd-server-add', device='nbd-export',
                                     name=export_name)
        self.assert_qmp(result, 'return', {})

        if export_name2 is not None:
            result = self.server.qmp('nbd-server-add', device='nbd-export',
                                     name=export_name2)
            self.assert_qmp(result, 'return', {})

        return True

    def _server_up(self, address, export_name=None, export_name2=None):
        self.assertTrue(self._try_server_up(address, export_name, export_name2))

    def _server_down(self):
        result = self.server.qmp('nbd-server-stop')
        self.assert_qmp(result, 'return', {})

    def do_test_inet(self, export_name=None):
        while True:
            nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END)
            address = { 'type': 'inet',
                        'data': {
                            'host': 'localhost',
                            'port': str(nbd_port)
                        } }
            if self._try_server_up(address, export_name):
                break

        export_name = export_name or 'nbd-export'
        self.client_test('nbd://localhost:%i/%s' % (nbd_port, export_name),
                         flatten_sock_addr(address), export_name)
        self._server_down()

    def test_inet_default_export_name(self):
        self.do_test_inet()

    def test_inet_same_export_name(self):
        self.do_test_inet('nbd-export')

    def test_inet_different_export_name(self):
        self.do_test_inet('shadow')

    def test_inet_two_exports(self):
        while True:
            nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END)
            address = { 'type': 'inet',
                        'data': {
                            'host': 'localhost',
                            'port': str(nbd_port)
                        } }
            if self._try_server_up(address, 'exp1', 'exp2'):
                break

        self.client_test('nbd://localhost:%i/%s' % (nbd_port, 'exp1'),
                         flatten_sock_addr(address), 'exp1', 'node1', False)
        self.client_test('nbd://localhost:%i/%s' % (nbd_port, 'exp2'),
                         flatten_sock_addr(address), 'exp2', 'node2', False)
        result = self.vm.qmp('blockdev-del', node_name='node1')
        self.assert_qmp(result, 'return', {})
        result = self.vm.qmp('blockdev-del', node_name='node2')
        self.assert_qmp(result, 'return', {})
        self._server_down()

    def test_inet6(self):
        try:
            socket.getaddrinfo("::0", "0", socket.AF_INET6,
                               socket.SOCK_STREAM, socket.IPPROTO_TCP,
                               socket.AI_ADDRCONFIG | socket.AI_CANONNAME)
        except socket.gaierror:
            # IPv6 not available, skip
            return

        while True:
            nbd_port = random.randrange(NBD_IPV6_PORT_START, NBD_IPV6_PORT_END)
            address = { 'type': 'inet',
                        'data': {
                            'host': '::1',
                            'port': str(nbd_port),
                            'ipv4': False,
                            'ipv6': True
                        } }
            if self._try_server_up(address):
                break

        filename = { 'driver': 'raw',
                     'file': {
                         'driver': 'nbd',
                         'export': 'nbd-export',
                         'server': flatten_sock_addr(address)
                     } }
        self.client_test(filename, flatten_sock_addr(address), 'nbd-export')
        self._server_down()

    def test_unix(self):
        address = { 'type': 'unix',
                    'data': { 'path': unix_socket } }
        self._server_up(address)
        self.client_test('nbd+unix:///nbd-export?socket=' + unix_socket,
                         flatten_sock_addr(address), 'nbd-export')
        self._server_down()

    def test_fd(self):
        self._server_up({ 'type': 'unix',
                          'data': { 'path': unix_socket } })

        sockfd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sockfd.connect(unix_socket)

        result = self.vm.send_fd_scm(fd=sockfd.fileno())
        self.assertEqual(result, 0, 'Failed to send socket FD')

        result = self.vm.qmp('getfd', fdname='nbd-fifo')
        self.assert_qmp(result, 'return', {})

        address = { 'type': 'fd',
                    'data': { 'str': 'nbd-fifo' } }
        filename = { 'driver': 'raw',
                     'file': {
                         'driver': 'nbd',
                         'export': 'nbd-export',
                         'server': flatten_sock_addr(address)
                     } }
        self.client_test(filename, flatten_sock_addr(address), 'nbd-export')

        self._server_down()


if __name__ == '__main__':
    iotests.main(supported_fmts=['raw'],
                 supported_protocols=['nbd'])
